use std::collections::HashMap;

use log::error;
use mysql::prelude::*;
use mysql::Error::MySqlError;
use mysql::{params, Opts, OptsBuilder, Pool, PooledConn, Row, ServerError};
use serde_json::{json, Value};
use strfmt::strfmt;

use super::{IDatabase, ERR_TABLE_NOT_EXISTS, TABLE_TASKS};
use crate::datasource::record::{IndexeaTask, Record};
use crate::datasource::{DataSource, IDataSource};
use crate::task::Task;

pub struct MySQL {
    pub pool: Pool,
}

impl IDataSource for MySQL {
    fn new(ds: &DataSource) -> Self {
        let db_url = format!(
            "{}{}pool_min=1&pool_max=5&tcp_connect_timeout_ms=1000",
            ds.url,
            if ds.url.find('?').is_some() { '&' } else { '?' }
        );
        let opts = Opts::from_url(db_url.as_str()).unwrap();
        let opts_builder = OptsBuilder::from_opts(opts);
        let pool = Pool::new::<OptsBuilder, _>(opts_builder).unwrap();
        MySQL { pool }
    }

    fn init(&self, tasks: &HashMap<&String, &Task>) -> Result<(), String> {
        self.init_db(tasks)
    }

    fn ping(&self) -> Result<bool, String> {
        Ok(self.conn().as_mut().ping())
    }

    fn records(
        &self,
        task: &Task,
        limit: u32,
        offset: u32,
        scroll_id: u64,
    ) -> Result<Vec<serde_json::Value>, String> {
        let sql = self.list_records_sql(task, limit, offset, scroll_id);
        let mut records = Vec::new();
        match self.conn().query::<Row, String>(sql) {
            Ok(rows) => {
                for row in rows {
                    let value = MySQL::row_to_json_value(task, &row);
                    records.push(value);
                }
            }
            Err(e) => return Err(e.to_string()),
        }
        Ok(records)
    }

    fn tasks(&self, name: &String, task: &Task, count: i32) -> Result<Vec<Record>, String> {
        let mut conn = self.conn();
        match conn.exec_map(
            "SELECT * FROM indexea_tasks WHERE `task` = ? AND `status` = 0 ORDER BY `id` LIMIT ?",
            (name, count),
            |(id, task, field, value, ftype, ops, status, created, updated)| IndexeaTask {
                id,
                task,
                field,
                value,
                ftype,
                ops,
                status,
                created,
                updated,
            },
        ) {
            Ok(tasks) => {
                let mut records = Vec::new();
                for t_record in tasks {
                    // read raw record
                    if t_record.ops == 3 {
                        // delete record not need to get raw record, composite value from task
                        let task_clone = t_record.clone();
                        records.push(Record {
                            value: serde_json::Value::String(t_record.value),
                            task: task_clone,
                            index: String::new(),
                        });
                    } else {
                        let sql = self.get_select_sql(task, &t_record);
                        match conn.query_first::<Row, &String>(&sql) {
                            Ok(option_row) => match option_row {
                                Some(row) => {
                                    let value = MySQL::row_to_json_value(task, &row);
                                    records.push(Record {
                                        value,
                                        task: t_record.clone(),
                                        index: String::new(),
                                    });
                                }
                                _ => {
                                    //The task refer to object not exists
                                    records.push(Record {
                                        value: serde_json::Value::Null,
                                        task: t_record.clone(),
                                        index: String::new(),
                                    });
                                }
                            },
                            Err(_e) => {
                                error!(
                                    "failed to get record from table [{}], reason: {:?}",
                                    task.table.as_ref().unwrap(),
                                    _e
                                );
                                break;
                            }
                        }
                    }
                }
                Ok(records)
            }
            Err(e) => match e {
                MySqlError(me) => {
                    if me.code == ServerError::ER_NO_SUCH_TABLE as u16 {
                        panic!("[mysql] {}", ERR_TABLE_NOT_EXISTS)
                    }
                    Err(me.message)
                }
                _ => Err(e.to_string()),
            },
        }
    }

    fn finish(&self, records: &Vec<Record>) -> Result<(), String> {
        let mut conn = self.conn();
        let mut err = false;
        for rec in records {
            let stmt = if rec.task.status == 1 {
                conn.prep("DELETE FROM indexea_tasks WHERE id = :id")
            } else {
                conn.prep("UPDATE indexea_tasks SET status = :status, updated=NOW() WHERE id = :id")
            };
            if let Err(e) = conn.exec_drop(
                &stmt.unwrap(),
                params! {
                                "status" => rec.task.status,
                                "id" => rec.task.id,
                },
            ) {
                err = true;
                error!(
                    "update task [id={}] status({}) failed, reason: {:?}",
                    rec.task.id, rec.task.status, e
                )
            }
        }
        if err {
            return Err(String::from("update tasks status failed, Please see the log for details"));
        }
        Ok(())
    }

    fn clean(&self, tasks: &HashMap<&String, &Task>) -> Result<(), String> {
        self.clean_db(tasks)
    }
}

impl IDatabase for MySQL {
    /// check table `indexea_tasks` exists?
    fn tasks_table_exists(&self) -> Result<bool, String> {
        let sql = format!("SELECT COUNT(id) FROM {}", TABLE_TASKS);
        //check table exists
        let res = self.conn().query_first::<u32, &String>(&sql);
        if let Err(MySqlError(e)) = res {
            if e.code == ServerError::ER_NO_SUCH_TABLE as u16 {
                return Ok(false);
            }
            return Err(e.message);
        } else {
            Ok(true)
        }
    }

    /// create indexea_tasks table
    fn create_tasks_table(&self) -> Result<(), String> {
        let sql = r#"
                        CREATE TABLE IF NOT EXISTS `indexea_tasks` (
                          `id` bigint unsigned NOT NULL AUTO_INCREMENT,
                          `task` varchar(32) NOT NULL,
                          `field` varchar(32) NOT NULL,
                          `value` varchar(64) NOT NULL,
                          `ftype` tinyint NOT NULL,
                          `ops` tinyint NOT NULL,
                          `status` tinyint NOT NULL DEFAULT '0',
                          `created` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP,
                          `updated` datetime NULL,
                          PRIMARY KEY (`id`),
                          KEY `indexea_tasks_status` (`task`,`status`)
                        )
                    "#;
        if let Err(MySqlError(e)) = self.conn().exec_drop(sql, ()) {
            return Err(e.to_string());
        }
        Ok(())
    }

    fn drop_tasks_table(&self) -> Result<(), String> {
        let sql = "DROP TABLE IF EXISTS `indexea_tasks`";
        if let Err(MySqlError(e)) = self.conn().exec_drop(sql, ()) {
            return Err(e.to_string());
        }
        Ok(())
    }

    /// create triggers for indexer
    fn create_triggers(&self, name: &String, task: &Task) -> Result<(), String> {
        let mut conn = self.conn();

        //check primary field type
        let field_type = 1.to_string();

        let mut vars = HashMap::new();
        vars.insert("task".to_string(), name.as_str());
        vars.insert("table".to_string(), task.table.as_ref().unwrap().as_str());
        vars.insert("field".to_string(), task.primary.as_ref().unwrap().as_str());
        vars.insert("type".to_string(), field_type.as_str());

        let sql = r#"
        DROP TRIGGER IF EXISTS indexea_after_insert_for_{task};
        CREATE TRIGGER indexea_after_insert_for_{task}
            AFTER INSERT
            ON {table} FOR EACH ROW
            INSERT INTO indexea_tasks(`task`,`field`,`value`,`ftype`,`ops`) VALUES('{task}', '{field}', NEW.{field}, {type}, 1);
        "#;
        let sql = strfmt(&sql, &vars).unwrap();
        if let Err(MySqlError(e)) = conn.query_drop(sql) {
            return Err(e.to_string());
        }

        let sql = r#"
        DROP TRIGGER IF EXISTS indexea_after_update_for_{task};
        CREATE TRIGGER indexea_after_update_for_{task}
            AFTER UPDATE
            ON {table} FOR EACH ROW
            INSERT INTO indexea_tasks(`task`,`field`,`value`,`ftype`,`ops`) VALUES('{task}', '{field}', OLD.{field}, {type}, 2);
        "#;
        let sql = strfmt(&sql, &vars).unwrap();
        if let Err(MySqlError(e)) = conn.query_drop(sql) {
            return Err(e.to_string());
        }

        let sql = r#"
        DROP TRIGGER IF EXISTS indexea_after_delete_for_{task};
        CREATE TRIGGER indexea_after_delete_for_{task}
            AFTER DELETE
            ON {table} FOR EACH ROW
            INSERT INTO indexea_tasks(`task`,`field`,`value`,`ftype`,`ops`) VALUES('{task}', '{field}', OLD.{field}, {type}, 3);
        "#;
        let sql = strfmt(&sql, &vars).unwrap();
        if let Err(MySqlError(e)) = conn.query_drop(sql) {
            return Err(e.to_string());
        }

        Ok(())
    }

    fn drop_triggers(&self, name: &String, _task: &Task) -> Result<(), String> {
        let sql = r#"
        DROP TRIGGER IF EXISTS indexea_after_insert_for_{task};
        DROP TRIGGER IF EXISTS indexea_after_update_for_{task};
        DROP TRIGGER IF EXISTS indexea_after_delete_for_{task};
        "#;
        let mut vars = HashMap::new();
        vars.insert("task".to_string(), name.as_str());
        let sql = strfmt(&sql, &vars).unwrap();

        if let Err(MySqlError(e)) = self.conn().query_drop(sql) {
            return Err(e.to_string());
        }

        Ok(())
    }
}

impl MySQL {
    fn conn(&self) -> PooledConn {
        self.pool.get_conn().expect("Failed to get connection from pool")
    }

    /// Turn mysql row to serde json value
    fn row_to_json_value(task: &Task, row: &Row) -> serde_json::Value {
        let mut data: HashMap<String, serde_json::Value> = HashMap::new();
        for i in 0..row.len() {
            if let Some(cv) = row.get::<mysql::Value, usize>(i) {
                let col = &row.columns_ref()[i];
                let col_name = std::str::from_utf8(col.name_ref()).unwrap().to_string();
                let col_type = col.column_type();
                if col_type.is_numeric_type() {
                    //numeric type
                    if let Ok(i_num) = mysql::from_value_opt::<i64>(cv.clone()) {
                        data.insert(col_name, json!(i_num));
                    } else if let Ok(f_num) = mysql::from_value_opt::<f64>(cv) {
                        data.insert(col_name, json!(f_num));
                    }
                } else if let Some(str) = mysql::from_value::<Option<String>>(cv) {
                    //non-numeric field
                    data.insert(col_name, json!(str));
                }
            }
        }

        if let Some(_id) = data.get(task.primary.as_ref().unwrap()) {
            data.insert("_id".to_string(), _id.clone());
        }
        serde_json::to_value(data).expect("failed to serialize to json")
    }
}
